﻿using MQTTnet;
#if NETSTANDARD2_1_OR_GREATER
using MQTTnet.Client;
#endif
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;

using MQTTNet.Client.Common;
using MQTTNet.Client.Enums;
using MQTTNet.Client.Handles;

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTNet.Client
{
    public class MQTTNetClient : IMQTTNetClient
    {
        public Action<string> OnConnectionHandle { get ; set; }
        public Action<string, string> OnMessageHandle { get; set; }
        public Action<string> OnDisConnectionHandle { get; set; }
        public Action<string,string, Exception> OnErrorHandle { get; set; }
        private Action<IMQTTNetClient>? _action = null;
        private IRouteHandle _routeHandle;

        public string? ClientId { get; private set; }
        private string _uri = string.Empty;
        private int _port = 1883;
        private string _userName = string.Empty;
        private string _password = string.Empty;
        private bool IsRun = true;
        private IMqttClient? mqtt = null;
        private MQTTVersionEnum MQTTVersion = MQTTVersionEnum.V311;
        public bool IsConnected { get => mqtt?.IsConnected ?? false;}
        private Dictionary<string,MqttQualityOfServiceLevel> SubscribeTopic = new Dictionary<string, MqttQualityOfServiceLevel>();
        private Dictionary<string, MpscChannel<string>> _publish_ret = new Dictionary<string, MpscChannel<string>>();
        private const int second = 1000;
        /// <summary>
        /// 初始化连接
        /// </summary>
        /// <param name="identity">主题标识</param>
        public MQTTNetClient(int identity=0)
        {
            _routeHandle = new RouteHandle(identity);
            this._routeHandle.Routes.ForEach(s =>
            {
                lock (SubscribeTopic)
                {
                    try
                    {
                        if (!SubscribeTopic.ContainsKey(s.Topic))
                            SubscribeTopic.Add(s.Topic, s.QOS.GetQOS());
                    }
                    catch (Exception ex)
                    {
                    }
                }
            });
        }
        /// <summary>
        /// 初始化连接
        /// </summary>
        /// <param name="uri">MQTT服务器地址</param>
        /// <param name="port">MQTT端口</param>
        /// <param name="UserName">用户名</param>
        /// <param name="Password">密码</param>
        /// <param name="clientId">连接Id</param>
        /// <param name="MQTTVersion">版本</param>
        /// <param name="Identity">主题标识</param>
        public MQTTNetClient(string uri,int port = 1883,string UserName = "",string Password = "",string clientId = "", MQTTVersionEnum MQTTVersion = MQTTVersionEnum.V311, int Identity = 0
            ) :this(Identity)
        { 
            this._uri = uri;
            this._port = port;
            this._userName = UserName;
            this._password = Password;
            this.MQTTVersion = MQTTVersion;
            this.ClientId = string.IsNullOrEmpty(clientId) ? Guid.NewGuid().ToString("N") : clientId;
        }
        /// <summary>
        /// 初始化连接
        /// </summary>
        /// <param name="uri">MQTT服务器地址</param>
        /// <param name="port">MQTT端口</param>
        /// <param name="UserName">用户名</param>
        /// <param name="Password">密码</param>
        /// <param name="clientId">连接Id</param>
        /// <param name="MQTTVersion">版本</param>
        public void InitMQTTClient(string uri, int port = 1883, string UserName = "", string Password = "", string clientId = "", MQTTVersionEnum MQTTVersion = MQTTVersionEnum.V500)
        {

            this._uri = uri;
            this._port = port;
            this._userName = UserName;
            this._password = Password;
            this.MQTTVersion=MQTTVersion;
            this.ClientId = string.IsNullOrEmpty(clientId) ? Guid.NewGuid().ToString("N") : clientId;
        }
        /// <summary>
        /// 连接对象
        /// </summary>
        /// <param name="action">回调函数</param>
        /// <returns></returns>
        public async Task<bool> Connection(Action<IMQTTNetClient>? action=null)
        {
            try
            {
                if (!this._uri.IsIP() && !this._uri.IsDomain())
                    return false;
                if (!this._port.IsPort())
                    return false;
                this.IsRun = true;
                this._action = action;
                this._action?.Invoke(this);
                #if NETSTANDARD2_1_OR_GREATER
                mqtt = new MqttFactory().CreateMqttClient();
                #elif NET8_0_OR_GREATER
                mqtt = new MqttClientFactory().CreateMqttClient();
                #endif
                mqtt.ConnectedAsync += Mqtt_ConnectedAsync;
                mqtt.DisconnectedAsync += Mqtt_DisRunAsync;
                mqtt.ApplicationMessageReceivedAsync += Mqtt_ApplicationMessageReceivedAsync;
                var option = new MqttClientOptionsBuilder()
                      .CrateMqttClientOptions(this._uri, this._port, this._userName, this._password
                      , this.ClientId,MQTTVersion);
                var result = await mqtt.ConnectAsync(option);
                return await Task.FromResult(result.ResultCode == MqttClientConnectResultCode.Success);
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("MQTT连接异常", ex));
                return false;
            }
           
        }

        public async Task<bool> DisConnection()
        {
            if (mqtt==null) return false;
            this.IsRun = false;
            return await mqtt.TryDisconnectAsync();
        }

        public void Dispose()
        {
            this.DisConnection().Wait();
            GC.Collect();
        }

        public async Task<bool> Publish(string topic, string message, QOSEnum qos = QOSEnum.ExactlyOnce)
        {
            if (mqtt == null||!this.IsRun) return false;
            var ret = await mqtt.PublishStringAsync(topic, message, (MqttQualityOfServiceLevel)qos);
            return ret.IsSuccess;
        }
        public async Task<T> Publish<T>(string topic, string message, string subTopic, QOSEnum pushqos = QOSEnum.ExactlyOnce, QOSEnum subqos = QOSEnum.ExactlyOnce)
        {
            if(!mqtt?.IsConnected??false||string.IsNullOrEmpty(topic) ||string.IsNullOrEmpty(message)||string.IsNullOrEmpty(subTopic))
                return default(T);
            _ = await mqtt?.SubscribeAsync(subTopic,(MqttQualityOfServiceLevel)subqos);
            MpscChannel<string> channel = new MpscChannel<string>();
            if (!_publish_ret.ContainsKey(subTopic))
                _publish_ret.Add(subTopic, channel);
            _ = await mqtt?.PublishStringAsync(topic, message, (MqttQualityOfServiceLevel)pushqos);
            int index = 30*second;
            while (index > 0)
            {
                if (!channel.TryReceive(out string str))
                {
                    index -= 1;
                    Task.Delay(1).Wait();
                    continue;
                }
                return str.JsonToObject<T>();
            }
            return default(T);
        }
        public async Task<bool> PublishBinary(string topic, IEnumerable<byte> message, QOSEnum qos = QOSEnum.ExactlyOnce)
        {
            if (mqtt == null || !this.IsRun) return false;
            var ret = await mqtt.PublishBinaryAsync(topic, payload: message, qualityOfServiceLevel: (MqttQualityOfServiceLevel)qos);
            return ret.IsSuccess;
        }
        public async Task Subscribe(string topic, QOSEnum qos = QOSEnum.ExactlyOnce)
        {

            try
            {
                if (mqtt == null || !this.IsRun) return;
                lock (SubscribeTopic)
                {
                    if (SubscribeTopic.ContainsKey(topic))
                    {
                        this.UnSubscribe(topic).Wait();
                        SubscribeTopic.Remove(topic);
                    }
                    SubscribeTopic.Add(topic, (MqttQualityOfServiceLevel)qos);
                }
                await mqtt.SubscribeAsync(topic, (MqttQualityOfServiceLevel)qos);
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("订阅主题异常", ex));
            }
        }
        public async Task Subscribe(string topic, string method, Type type, QOSEnum qos = QOSEnum.ExactlyOnce)
        {

            try
            {
                if (mqtt == null || !this.IsRun) return;
                lock (SubscribeTopic)
                {
                    if (SubscribeTopic.ContainsKey(topic))
                    {
                        this.UnSubscribe(topic).Wait();
                        SubscribeTopic.Remove(topic);
                    }
                    SubscribeTopic.Add(topic, (MqttQualityOfServiceLevel)qos);
                }
                if (_routeHandle.AddRoute(topic, method, type, qos))
                    await mqtt.SubscribeAsync(topic, (MqttQualityOfServiceLevel)qos);
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("订阅主题至反射函数异常", ex));
            }
        }
        public async Task UnSubscribe(string topic)
        {
            try
            {
                if (mqtt == null || !this.IsRun) return;
                lock (SubscribeTopic)
                {
                    if (SubscribeTopic.ContainsKey(topic))
                        SubscribeTopic.Remove(topic);
                    mqtt.UnsubscribeAsync(topic);
                }
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("取消订阅主题异常", ex));
            }

        }
        /// <summary>
        /// MQTT连接成功事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private async Task Mqtt_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            if (OnConnectionHandle != null)
                OnConnectionHandle.Invoke(this.ClientId);
            if (SubscribeTopic != null && SubscribeTopic.Count > 0)
            {
                List<MqttTopicFilter> topicFilters = SubscribeTopic.Select(s => new MqttTopicFilter()
                {
                    Topic = s.Key,
                    RetainAsPublished = false,
                    QualityOfServiceLevel = (MqttQualityOfServiceLevel)s.Value
                }).ToList();
                MqttClientSubscribeOptions subscribeOptions = new MqttClientSubscribeOptions()
                {
                    TopicFilters = topicFilters
                };
                _ = mqtt?.SubscribeAsync(subscribeOptions);
            }

        }
        /// <summary>
        /// MQTT断开连接事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private async Task Mqtt_DisRunAsync(MqttClientDisconnectedEventArgs arg)
        {
            try
            {
                if (OnDisConnectionHandle != null)
                    OnDisConnectionHandle?.Invoke(this.ClientId);
                if (IsRun)
                    await Mqtt_ReConnect();
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("断开连接回调异常", ex));
            }
            
        }
        private async Task Mqtt_ReConnect()
        {
            try
            {
                await mqtt.ReconnectAsync();//.ConnectAsync(mqtt.Options, default);
                if (mqtt?.IsConnected ?? true)
                    return;
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("断线重连异常", ex));
            }
        }




        /// <summary>
        /// MQTT数据到达事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private async Task Mqtt_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            try
            {
                if (arg.ApplicationMessage.Payload.ToArray() == null)
                    return;
                //获取主题
                string topic = arg.ApplicationMessage.Topic;
               _=Task.Run(() =>ApplicationMessageReceivedAsync(topic, arg.ApplicationMessage.Payload.ToArray()));
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("MQTT数据到达事件异常", ex));
                return;
            }
            
        }
        private void ApplicationMessageReceivedAsync(string Topic, byte[] binary)
        {
            try
            {
                //获取消息体
                string str_payload = Encoding.UTF8.GetString(binary);
                if (_publish_ret.TryGetValue(Topic, out MpscChannel<string> msg))
                {
                    _publish_ret.Remove(Topic);
                    mqtt?.UnsubscribeAsync(Topic).Wait();
                    msg.Send(str_payload);
                    return;
                }
                if (this._routeHandle != null && this._routeHandle.ActionRoute(Topic, binary, this))
                    return;
                if (OnMessageHandle != null)
                    OnMessageHandle?.Invoke(Topic, str_payload);
            }
            catch (Exception ex)
            {
                _ = Task.Run(() => ErrorMessageDelegate("MQTT数据到达事件异常", ex));
                return;
            }
        }

        private void ErrorMessageDelegate( string Name, Exception exception)
        {
            try
            {
                if (OnErrorHandle != null)
                    OnErrorHandle?.Invoke(this.ClientId,Name, exception);
            }
            finally
            {
            }
        }
    }
}
